package com.starzy.hadoop.hdfs;

import java.util.LinkedList;

public class FSEditLog {
    private long txid=0L;
    private DoubleBuffer editLogBuffer=new DoubleBuffer();
    //当前是否正在往磁盘里面刷写数据
    private volatile Boolean isSyncRunning = false;
    private volatile Boolean isWaitSync = false;

    private volatile Long syncMaxTxid = 0L;
    /**
     * 一个线程 就会有自己一个ThreadLocal的副本
     */
    private ThreadLocal<Long> localTxid=new ThreadLocal<Long>();


    /**
     * 记录元数据日志方法
     * @param content
     */
    public void logEdit(String content){//   mkdir /data
        /**
         * 线程一
         * 线程二
         * 线程三
         *
         * 线程四
         * 线程五
         */
        //加了一把锁
        synchronized (this){
            /**
             * 编号不能重复，是递增的
             * 线程一：1
             * 线程二：2
             * 线程三：3
             *
             * 线程四：4
             * 线程五：5
             */
            txid++;
            /**
             * 线程一：localTxid = 1
             * 线程二：localTxid = 2
             * 线程三：localTxid = 3
             */
            localTxid.set(txid);
            EditLog log = new EditLog(txid, content);
            editLogBuffer.write(log);
        } //释放锁

        logSync();
    }

    private  void logSync(){
        /**
         * 线程一
         * 线程二
         */
        synchronized (this){
            //线程一：判断当前是否正在往磁盘上面写数据
            //线程二：判断当前是否正在往磁盘上面写数据
            //线程三：判断当前是否正在往磁盘上面写数据

            //线程四：判断当前是否正在往磁盘上面写数据
            //线程五：判断当前是否正在往磁盘上面写数据
            if(isSyncRunning){
                //线程二：2
                //线程三：3
                //线程四：4
                //线程五：5
                long txid = localTxid.get();
                //线程二：2 <= 3
                //线程三： 3 <= 3
                //线程四： 4 <= 3
                //线程五： 5 <= 3
                if(txid <= syncMaxTxid){
                    return;
                }
                //线程四不满足
                if(isWaitSync){
                    //线程5直接返回
                    return;
                }
                //赋值
                isWaitSync = true;
                //线程四满足了
                while(isSyncRunning){
                    try {
                        //线程四就在这儿等：1）要么就时间到了 2）被人唤醒了
                        wait(2000);//wait操作是会释放锁
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
                //线程四
                isWaitSync = false;
            }

            //进行内存交换。在源码里面是有个判断条件，达到一定地方才会交换内存。
            editLogBuffer.setReadyToSync();


            if(editLogBuffer.syncBuffer.size() > 0) {
                syncMaxTxid = editLogBuffer.getSyncMaxTxid();
            }
            //
            isSyncRunning = true;

        } //线程一 释放锁

        /**
         * 分段加锁
         * 线程一 刷写数据
         * 这个过程要稍微慢一些，因为往磁盘上面去写数据。
         * 线程一就会在这儿运行一会儿。
         */
        //50ms
        editLogBuffer.flush();

        //重新加锁
        synchronized (this) {
            //线程一 赋值为false
            isSyncRunning = false;
            //唤醒等待线程。
            notify();
        }

    }


    /**
     * 首先用面向对象的思想，把每一条元数据信息，都看做一个对象。
     */
    class EditLog{
        long txid;//每一条元数据都有编号，递增
        String content;//元数据的内容 mkdir /data |  delete /data

        //构造函数
        public EditLog(long txid,String content){
            this.txid = txid;
            this.content = content;
        }

        //方便我们打印日志
        @Override
        public String toString() {
            return "EditLog{" +
                    "txid=" + txid +
                    ", content='" + content + '\'' +
                    '}';
        }
    }

    /**
     * 双缓冲的方案
     */
    class DoubleBuffer{
        //第一个内存：接收数据
        LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
        //第二个内存：把数据写到磁盘
        LinkedList<EditLog> syncBuffer= new LinkedList<EditLog>();

        /**
         * 写数据
         * @param log
         */
        public void write(EditLog log){
            currentBuffer.add(log);
        }


        /**
         *  内存交换
         */
        public void setReadyToSync(){
            LinkedList<EditLog> tmp= currentBuffer;
            currentBuffer = syncBuffer;
            syncBuffer = tmp;
        }

        /**
         * 获取同步内存里面最大一个日志编号
         * @return
         */
        public Long getSyncMaxTxid(){
            return syncBuffer.getLast().txid;
        }


        /**
         * 把数据刷写到磁盘
         */
        public void flush(){
            for(EditLog log:syncBuffer){
                System.out.println("存入磁盘日志信息："+log);
            }
            syncBuffer.clear();
        }
    }
}
